tornado pyzmq 高性能cpu密集型服务搭建

Tornado是一款Python 服务器框架,其采用异步IO 的网络模型,具有较高性能。在Tornado 基础上为了提升Cpu 密集型应用在多核Cpu 服务器上的性能表现,可以采用Tornado 协程(异步非阻塞)+队列(通信、解耦)+多线程/多进程任务(榨干多核Cpu)的方案。

一. 简易Tornado服务

Tornado 的详细内容参考官网,这里结合实际应用介绍Tornado 的核心知识点。

Tornado 作为http 服务器,其核心模块由上至下为 HTTPServer - Application - RequestHandler 三级。简单搭建一个http 服务demo,demo 中包含一个耗时任务,用来模拟Cpu 密集型任务。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import sys
import random
import time
import os

import tornado
from tornado import ioloop
from tornado import web
import tornado.httpserver

def dot():
"""callback for showing that IOLoop is still responsive while we wait"""
sys.stdout.write('.')
sys.stdout.flush()

def slow_responder(num):
print(num)
time.sleep(random.randint(1,5))

class TestHandler(web.RequestHandler):

def get(self):
print("get request")
slow_responder(5)
res = "get http request"
print(res)
self.write(res)

def main():
application = web.Application([(r"/", TestHandler)])
server = tornado.httpserver.HTTPServer(application)
server.bind(8886)
#specify number of subprocess
server.start(1)

beat = ioloop.PeriodicCallback(dot, 100)
beat.start()

try:
ioloop.IOLoop.instance().start()
except KeyboardInterrupt:
print(' Interrupted')


if __name__ == "__main__":
main()

运行程序并在浏览器进行请求,服务器会收到字符信息“get http request”,同时控制台显示结果如下。点点点为心跳信号,证明服务器在运行状态。观察到在接收到请求后等待几秒后才返回请求,同时这段时间内心跳信号消失,说明此时程序在处理耗时任务,不能响应其他请求连接。

1
2
3
4
...................................get request
5
get http request
...............................................................................

问题来了,不是说Tronado 是高性能的网络连接库吗,这里为什么不能响应其他连接呢?

  1. Tronado 采用 epoll 技术实现高并发网络连接,与通常采用多线程或多进程并发连接的技术方案不同,Tornado 默认是单进程单线程的。即demo 中 server、application 及handler 处在同一个进程及同一个主线程中。对于高并发技术方案可参考《C10K问题》
  2. 大家对网络连接的概念没有搞清楚,默认网络连接和请求响应任务处理是同一件事。高并发仅针对网络连接而言,与任务处理无关。当10台远端服务器同时请求我的http服务时,10个连接其实都已建立,但由于任务耗时较长,第一个连接的任务尚未处理完毕,其他9个连接的任务只能等待,造成程序阻塞。
  3. 高并发(连接)与高性能其实并不等同,对于IO 密集型服务由于任务Cpu 耗时短,高并发与高性能可近似相等,但Cpu 密集型任务则不能单单考虑高并发,还需考虑具体任务的性能。

二.多进程、多线程及协程

提升服务性能通常可以通过增强程序的并行处理能力来得到提高,常用的并行方案包括多进程、多线程及协程。多进程和多线程是操作系统提供及控制的,协程是应用程序自己控制的。下面对当前情景下的几种并行方案进行分析。

1.多进程

Tornado 多进程可通过构建多个Application实例实现,仅需对代码进行简单修改:

1
2
#开启2个进程
server.start(2)

输出:

1
2
3
4
5
6
7
............get request
5
..............................get request
5
.........................................get http request
.............................get http request
....................................................

可以看出服务器处理了两个请求连接后进行输出,实现了并行。

Tornado 多进程的方式简单粗暴,但这种方案也有其缺点:

*a.系统的进程资源很宝贵,最大进程数有限。

*b.进程成本很高,多进程程序等于同一份程序复制n遍,耗费资源(内存、Cpu 上下文切换)较大。

2.多线程

多线程相比多进程占用系统资源少,但线程之间需要考虑同步及死锁等问题,同时大量线程cpu调度也是需要考虑的问题。

对于当前应用场景,最大的问题其实是Python GIL(Global Interpreter Lock),这是一把全局解释器锁,再多的线程在解释器层面其实只相当于一个线程,所以Python 中通常不采用多线程的方案。

3.协程

协程简单理解可以认为是用户态微线程。即用户自己控制(非操作系统调度)的微线程,并且多个微线程可以在单一线程下运行,没有线程切换的开销,执行效率极高。

综上,采用协程对于实现请求异步非阻塞处理是最佳方案。下面是采用协程的代码修改部分:

1
2
3
4
5
6
7
8
9
10
11
from tornado import gen  

class TestHandler(web.RequestHandler):

@gen.coroutine
def get(self):
print("get request")
slow_responder(5)
res = "get http request"
print(res)
self.write(res)

输出如下:

1
2
3
4
5
6
.............get request
5
..................get request
5
........................................get http request
.........................................................................get http request

可以看出,协程方案实现了并行效果。现在是不是可以大功告成了呢,其实差矣,因为之前说过,Tornado 默认单进程单线程,即使请求处理改造成并行,但任务处理仍是单线程执行,整体程序性能仍较低。因此需要进一步改造。

三.多线程/多进程任务+任务队列分发

现代服务器Cpu 核数可是很多的(至少8核),单进程单线程跑在一个Cpu 上岂不是浪费资源。因此最容易想到的就是将任务通过多线程最大化利用多核Cpu。多线程后面临的一个问题就是线程间消息的通信,如何将任务分发到线程及获取线程处理结果。这时就需要消息队列来进行通信和解耦。这里使用zeroMQ 作为消息队列。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import sys
import random
import threading
import time
import os

import zmq
from zmq.eventloop.future import Context as FutureContext
from zmq.devices.basedevice import ProcessDevice

import tornado
from tornado import gen
from tornado import ioloop
from tornado import web
import tornado.httpserver

def fast_worker(num):
"""thread for slowly responding to replies."""
ctx = zmq.Context()
socket = ctx.socket(zmq.DEALER)
socket.connect('tcp://127.0.0.1:5556')
i = 0
while True:
frame, msg = socket.recv_multipart()
print("\nworker received %r\n" % msg, end='')
print(num)
time.sleep(random.randint(1,5))
socket.send_multipart([frame, msg + b" to you too, #%i" % i])
i += 1


def dot():
"""callback for showing that IOLoop is still responsive while we wait"""
sys.stdout.write('.')
sys.stdout.flush()

class TestHandler(web.RequestHandler):

@gen.coroutine
def get(self):
ctx = FutureContext.instance()
s = ctx.socket(zmq.DEALER)

s.connect('tcp://127.0.0.1:5555')
# send request to worker
yield s.send(b'hello')
# finish web request with worker's reply
reply = yield s.recv()
print("\nfinishing with %r\n" % reply)
s.close()
self.write(reply)

def main():

worker2 = threading.Thread(target=fast_worker,args=(2,))
worker2.daemon=True
worker2.start()

worker1 = threading.Thread(target=fast_worker,args=(1,))
worker1.daemon=True
worker1.start()

queuedevice = ProcessDevice(zmq.QUEUE, zmq.ROUTER, zmq.DEALER)
queuedevice.bind_in('tcp://127.0.0.1:5555')
queuedevice.bind_out('tcp://127.0.0.1:5556')
queuedevice.start()

application = web.Application([(r"/", TestHandler)])
server = tornado.httpserver.HTTPServer(application)
server.bind(8886)
#specify number of subprocess
server.start(1)
beat = ioloop.PeriodicCallback(dot, 100)
beat.start()
try:
ioloop.IOLoop.instance().start()
except KeyboardInterrupt:
print(' Interrupted')


if __name__ == "__main__":
main()

输出如下:

1
2
3
4
5
6
7
8
9
10
11
.........................
worker received b'hello'
1
...
worker received b'hello'
2
...............................
finishing with b'hello to you too, #0'

...............
finishing with b'hello to you too, #0'

通过多线程+消息队列我们实现了任务并行,可充分榨干多核Cpu 性能,提升Cpu 密集型服务的处理能力。

四.总结

这种方案性能的提升留给大家自己去实践和测试。对于Cpu 密集型应用,如何最大化提升性能仍任重而道远。